-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(stream): send barriers in batch when possible #19932
base: main
Are you sure you want to change the base?
Conversation
997af79
to
08e83f2
Compare
08e83f2
to
366a139
Compare
The cluster is running a release build, 256 parallelism in total, deployed via risedev in a 16c32GB EC2: d7b5cf5 The comparison below shows that
An additional test not included in the figure indicates that the main branch experiences the same issue as when |
let limit = (self.context.config.developer).exchange_concurrent_dispatchers; | ||
|
||
// Only barrier can be batched for now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not strongly related to this PR: I wonder whether peaking the message including barrier/chunk and batch them by size into one gRPC message can be a general optimization to amortize network cost.
Theoretically it is http2 and tcp under the hook so the network package should be batched already but given that the barrier batch optimization works, I wonder whether there are other overheads that we are not aware of in gRPC.
I also saw a few asks for gRPC streaming rpc message batching elsewhere:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether there are other overheads that we are not aware of in gRPC.
This PR also demonstrates the improvement when tested in a single compute node, where all channels are local.
but given that the barrier batch optimization works
This PR primarily reduces the total number of incoming message for both RemoteInput
and LocalInput
. However from the perspective of MergeExecutor
, the total number of message to poll doesn't change since BarrierBatch
has been reverted to multiple Barriers
for the output RemoteInput
and LocalInput
.
Regarding the source of performance gain in this PR, I see two:
- A batch of barriers only requires 1 barrier permit, consequently there're less permits to add back.
- Although the total number of messages remains unchanged for the
MergeExecutor
, the cost of polling a message can be reduced due to this very simple new state in the auto-generated future, i.e. less useless poll::pending.
src/stream/src/executor/mod.rs
Outdated
#[derive(Debug, EnumAsInner, PartialEq, Clone)] | ||
pub enum MessageBatch { | ||
Chunk(StreamChunk), | ||
BarrierBatch(Vec<BarrierInner<BarrierMutationType>>), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have some empirical idea of the length of barrier batch? If it's small, shall we consider SmallVec
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
During my tests, it remains in single digits for most of the time.
I'll use SmallVec<[T;8]> here.
Encountering large size difference between variants
on MessageBatchInner
when using SmallVec
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It turns out the stack size of Barrier
is considerable, so SmallVec<[Barrier; 8]>
can be significantly large.
let mut input = self.input.execute().peekable(); | ||
let mut end_of_stream = false; | ||
while !end_of_stream { | ||
let Some(msg) = input.next().await else { | ||
end_of_stream = true; | ||
continue; | ||
}; | ||
let mut barrier_batch = vec![]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to decouple the logic for batching messages from dispatching them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently it's done in following order:
- Poll one msg.
- Peek N consecutive barriers, only if the lastest msg is non-mutation barrier.
- Send the batch msg.
Could you please elaborate "decouple"?
This reverts commit ca0d6685f8c2e135c56b1d407ea5b4e74e42fed9.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rubber stamp. Any concerns?
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Excessive barrier messages between dispatcher and merger have been shown to hinder performance, by consuming CPU resources on message polling. This issue is significant when parallelism is high, because a barrier would result in NxM barrier messages between dispatcher and merger. Please refer to the benchmark attached below for further details.
This PR aims to reduce the number of barrier messages between dispatcher and merger, by combining consecutive non-mutation barriers into one message.
This PR primarily reduces the total number of incoming message for both RemoteInput and LocalInput. However from the perspective of MergeExecutor, the total number of message to poll doesn't change since BarrierBatch has been reverted to multiple Barriers for the output RemoteInput and LocalInput. So regarding the source of performance gain in this PR, I see two:
More discussion here.
Checklist
Documentation
Release note